#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#include "thread_pool.h"

#define MAX_THREAD 10000

#define LL_ADD(item, list) { \
    item->prev = NULL; \
    item->next = list; \
    list = item; \
}

#define LL_REMOVE(item, list) { \
    if (item->prev != NULL) item->prev->next = item->next; \
    if (item->next != NULL) item->next->prev = item->prev; \
    if (list == item) list = item->next; \
    item->prev = item->next = NULL; \
}

typedef struct worker {
    pthread_t thread;
    int terminate;
    struct workqueue *workqueue;
    struct worker *prev;
    struct worker *next;
} worker_t;

typedef struct workqueue {
    struct worker *workers;
    struct job *waiting_jobs;
    pthread_mutex_t jobs_mutex;
    pthread_cond_t jobs_cond;
} workqueue_t;

static
void *worker_function(void *ptr) {
    worker_t *worker = (worker_t *)ptr;
    job_t *job;

    while (1) {
        pthread_mutex_lock(&worker->workqueue->jobs_mutex);
        while (worker->workqueue->waiting_jobs == NULL) {
            if (worker->terminate) break;
            pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mutex);
        }
        if (worker->terminate) break;
        job = worker->workqueue->waiting_jobs;
        if (job != NULL) {
            LL_REMOVE(job, worker->workqueue->waiting_jobs);
        }
        pthread_mutex_unlock(&worker->workqueue->jobs_mutex);

        if (job == NULL) continue;

        /* Execute the job. */
        job->job_function(job);
    }

    free(worker);
    pthread_exit(NULL);
}

static
int workqueue_init(workqueue_t *workqueue, int numWorkers) {
    int i;
    worker_t *worker;
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;

    if (numWorkers < 1) numWorkers = 1;

    memset(workqueue, 0, sizeof(*workqueue));
    memcpy(&workqueue->jobs_mutex, &blank_mutex, sizeof(workqueue->jobs_mutex));
    memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));

    for (i = 0; i < numWorkers; i++) {
        if ((worker = (worker_t*)malloc(sizeof(worker_t))) == NULL) {
            perror("Failed to allocate all workers");
            return 1;
        }

        memset(worker, 0, sizeof(*worker));
        worker->workqueue = workqueue;

        if (pthread_create(&worker->thread, NULL, worker_function, (void *)worker)) {
            perror("Failed to start all worker threads");
            free(worker);
            return 1;
        }

        LL_ADD(worker, worker->workqueue->workers);
    }

    return 0;
}


static
void workqueue_shutdown(workqueue_t *workqueue) {

    worker_t *worker = NULL;
    for (worker = workqueue->workers; worker != NULL; worker = worker->next) {
        worker->terminate = 1;
    }


    pthread_mutex_lock(&workqueue->jobs_mutex);
    workqueue->workers = NULL;
    workqueue->waiting_jobs = NULL;
    pthread_cond_broadcast(&workqueue->jobs_cond);
    pthread_mutex_unlock(&workqueue->jobs_mutex);

}


static
void workqueue_add_job(workqueue_t *workqueue, job_t *job) {

    pthread_mutex_lock(&workqueue->jobs_mutex);

    LL_ADD(job, workqueue->waiting_jobs);

    pthread_cond_signal(&workqueue->jobs_cond);
    pthread_mutex_unlock(&workqueue->jobs_mutex);

}

static workqueue_t g_workqueue;

void threadpool_init(void) {
    workqueue_init(&g_workqueue, MAX_THREAD);
}
void threadpool_fini(void) {
    workqueue_shutdown(&g_workqueue);
}
void threadpool_add_job(job_t* job)
{
    workqueue_add_job(&g_workqueue, job);
}
